// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "base/task_scheduler/scheduler_worker_pool_impl.h"

#include <stddef.h>

#include <algorithm>
#include <utility>

#include "base/bind.h"
#include "base/bind_helpers.h"
#include "base/lazy_instance.h"
#include "base/memory/ptr_util.h"
#include "base/sequenced_task_runner.h"
#include "base/single_thread_task_runner.h"
#include "base/strings/stringprintf.h"
#include "base/task_scheduler/delayed_task_manager.h"
#include "base/task_scheduler/task_tracker.h"
#include "base/threading/platform_thread.h"
#include "base/threading/thread_local.h"
#include "base/threading/thread_restrictions.h"

namespace base {
namespace internal {

    namespace {

        // SchedulerWorker that owns the current thread, if any.
        LazyInstance<ThreadLocalPointer<const SchedulerWorker>>::Leaky
            tls_current_worker
            = LAZY_INSTANCE_INITIALIZER;

        // SchedulerWorkerPool that owns the current thread, if any.
        LazyInstance<ThreadLocalPointer<const SchedulerWorkerPool>>::Leaky
            tls_current_worker_pool
            = LAZY_INSTANCE_INITIALIZER;

        // A task runner that runs tasks with the PARALLEL ExecutionMode.
        class SchedulerParallelTaskRunner : public TaskRunner {
        public:
            // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so
            // long as |worker_pool| is alive.
            // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory.
            SchedulerParallelTaskRunner(const TaskTraits& traits,
                SchedulerWorkerPool* worker_pool)
                : traits_(traits)
                , worker_pool_(worker_pool)
            {
            }

            // TaskRunner:
            bool PostDelayedTask(const tracked_objects::Location& from_here,
                const Closure& closure,
                TimeDelta delay) override
            {
                // Post the task as part of a one-off single-task Sequence.
                return worker_pool_->PostTaskWithSequence(
                    WrapUnique(new Task(from_here, closure, traits_, delay)),
                    make_scoped_refptr(new Sequence), nullptr);
            }

            bool RunsTasksOnCurrentThread() const override
            {
                return tls_current_worker_pool.Get().Get() == worker_pool_;
            }

        private:
            ~SchedulerParallelTaskRunner() override = default;

            const TaskTraits traits_;
            SchedulerWorkerPool* const worker_pool_;

            DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner);
        };

        // A task runner that runs tasks with the SEQUENCED ExecutionMode.
        class SchedulerSequencedTaskRunner : public SequencedTaskRunner {
        public:
            // Constructs a SchedulerSequencedTaskRunner which can be used to post tasks
            // so long as |worker_pool| is alive.
            // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory.
            SchedulerSequencedTaskRunner(const TaskTraits& traits,
                SchedulerWorkerPool* worker_pool)
                : traits_(traits)
                , worker_pool_(worker_pool)
            {
            }

            // SequencedTaskRunner:
            bool PostDelayedTask(const tracked_objects::Location& from_here,
                const Closure& closure,
                TimeDelta delay) override
            {
                std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay));
                task->sequenced_task_runner_ref = this;

                // Post the task as part of |sequence_|.
                return worker_pool_->PostTaskWithSequence(std::move(task), sequence_,
                    nullptr);
            }

            bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
                const Closure& closure,
                base::TimeDelta delay) override
            {
                // Tasks are never nested within the task scheduler.
                return PostDelayedTask(from_here, closure, delay);
            }

            bool RunsTasksOnCurrentThread() const override
            {
                return tls_current_worker_pool.Get().Get() == worker_pool_;
            }

        private:
            ~SchedulerSequencedTaskRunner() override = default;

            // Sequence for all Tasks posted through this TaskRunner.
            const scoped_refptr<Sequence> sequence_ = new Sequence;

            const TaskTraits traits_;
            SchedulerWorkerPool* const worker_pool_;

            DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner);
        };

        // A task runner that runs tasks with the SINGLE_THREADED ExecutionMode.
        class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner {
        public:
            // Constructs a SchedulerSingleThreadTaskRunner which can be used to post
            // tasks so long as |worker_pool| and |worker| are alive.
            // TODO(robliao): Find a concrete way to manage the memory of |worker_pool|
            // and |worker|.
            SchedulerSingleThreadTaskRunner(const TaskTraits& traits,
                SchedulerWorkerPool* worker_pool,
                SchedulerWorker* worker)
                : traits_(traits)
                , worker_pool_(worker_pool)
                , worker_(worker)
            {
            }

            // SingleThreadTaskRunner:
            bool PostDelayedTask(const tracked_objects::Location& from_here,
                const Closure& closure,
                TimeDelta delay) override
            {
                std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay));
                task->single_thread_task_runner_ref = this;

                // Post the task to be executed by |worker_| as part of |sequence_|.
                return worker_pool_->PostTaskWithSequence(std::move(task), sequence_,
                    worker_);
            }

            bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
                const Closure& closure,
                base::TimeDelta delay) override
            {
                // Tasks are never nested within the task scheduler.
                return PostDelayedTask(from_here, closure, delay);
            }

            bool RunsTasksOnCurrentThread() const override
            {
                return tls_current_worker.Get().Get() == worker_;
            }

        private:
            ~SchedulerSingleThreadTaskRunner() override = default;

            // Sequence for all Tasks posted through this TaskRunner.
            const scoped_refptr<Sequence> sequence_ = new Sequence;

            const TaskTraits traits_;
            SchedulerWorkerPool* const worker_pool_;
            SchedulerWorker* const worker_;

            DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner);
        };

        // Only used in DCHECKs.
        bool ContainsWorker(
            const std::vector<std::unique_ptr<SchedulerWorker>>& workers,
            const SchedulerWorker* worker)
        {
            auto it = std::find_if(workers.begin(), workers.end(),
                [worker](const std::unique_ptr<SchedulerWorker>& i) {
                    return i.get() == worker;
                });
            return it != workers.end();
        }

    } // namespace

    class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl
        : public SchedulerWorker::Delegate {
    public:
        // |outer| owns the worker for which this delegate is constructed.
        // |re_enqueue_sequence_callback| is invoked when ReEnqueueSequence() is
        // called with a non-single-threaded Sequence. |shared_priority_queue| is a
        // PriorityQueue whose transactions may overlap with the worker's
        // single-threaded PriorityQueue's transactions. |index| will be appended to
        // the pool name to label the underlying worker threads.
        SchedulerWorkerDelegateImpl(
            SchedulerWorkerPoolImpl* outer,
            const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
            const PriorityQueue* shared_priority_queue,
            int index);
        ~SchedulerWorkerDelegateImpl() override;

        PriorityQueue* single_threaded_priority_queue()
        {
            return &single_threaded_priority_queue_;
        }

        // SchedulerWorker::Delegate:
        void OnMainEntry(SchedulerWorker* worker) override;
        scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override;
        void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override;
        TimeDelta GetSleepTimeout() override;
        bool CanDetach(SchedulerWorker* worker) override;

    private:
        SchedulerWorkerPoolImpl* outer_;
        const ReEnqueueSequenceCallback re_enqueue_sequence_callback_;

        // Single-threaded PriorityQueue for the worker.
        PriorityQueue single_threaded_priority_queue_;

        // True if the last Sequence returned by GetWork() was extracted from
        // |single_threaded_priority_queue_|.
        bool last_sequence_is_single_threaded_ = false;

        const int index_;

        DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl);
    };

    SchedulerWorkerPoolImpl::~SchedulerWorkerPoolImpl()
    {
        // SchedulerWorkerPool should never be deleted in production unless its
        // initialization failed.
        DCHECK(join_for_testing_returned_.IsSignaled() || workers_.empty());
    }

    // static
    std::unique_ptr<SchedulerWorkerPoolImpl> SchedulerWorkerPoolImpl::Create(
        StringPiece name,
        ThreadPriority thread_priority,
        size_t max_threads,
        IORestriction io_restriction,
        const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
        TaskTracker* task_tracker,
        DelayedTaskManager* delayed_task_manager)
    {
        std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool(
            new SchedulerWorkerPoolImpl(name, io_restriction, task_tracker,
                delayed_task_manager));
        if (worker_pool->Initialize(thread_priority, max_threads,
                re_enqueue_sequence_callback)) {
            return worker_pool;
        }
        return nullptr;
    }

    void SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting()
    {
        AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
        while (idle_workers_stack_.Size() < workers_.size())
            idle_workers_stack_cv_for_testing_->Wait();
    }

    void SchedulerWorkerPoolImpl::JoinForTesting()
    {
        for (const auto& worker : workers_)
            worker->JoinForTesting();

        DCHECK(!join_for_testing_returned_.IsSignaled());
        join_for_testing_returned_.Signal();
    }

    scoped_refptr<TaskRunner> SchedulerWorkerPoolImpl::CreateTaskRunnerWithTraits(
        const TaskTraits& traits,
        ExecutionMode execution_mode)
    {
        switch (execution_mode) {
        case ExecutionMode::PARALLEL:
            return make_scoped_refptr(new SchedulerParallelTaskRunner(traits, this));

        case ExecutionMode::SEQUENCED:
            return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this));

        case ExecutionMode::SINGLE_THREADED: {
            // TODO(fdoray): Find a way to take load into account when assigning a
            // SchedulerWorker to a SingleThreadTaskRunner. Also, this code
            // assumes that all SchedulerWorkers are alive. Eventually, we might
            // decide to tear down threads that haven't run tasks for a long time.
            size_t worker_index;
            {
                AutoSchedulerLock auto_lock(next_worker_index_lock_);
                worker_index = next_worker_index_;
                next_worker_index_ = (next_worker_index_ + 1) % workers_.size();
            }
            return make_scoped_refptr(new SchedulerSingleThreadTaskRunner(
                traits, this, workers_[worker_index].get()));
        }
        }

        NOTREACHED();
        return nullptr;
    }

    void SchedulerWorkerPoolImpl::ReEnqueueSequence(
        scoped_refptr<Sequence> sequence,
        const SequenceSortKey& sequence_sort_key)
    {
        shared_priority_queue_.BeginTransaction()->Push(std::move(sequence),
            sequence_sort_key);

        // The thread calling this method just ran a Task from |sequence| and will
        // soon try to get another Sequence from which to run a Task. If the thread
        // belongs to this pool, it will get that Sequence from
        // |shared_priority_queue_|. When that's the case, there is no need to wake up
        // another worker after |sequence| is inserted in |shared_priority_queue_|. If
        // we did wake up another worker, we would waste resources by having more
        // workers trying to get a Sequence from |shared_priority_queue_| than the
        // number of Sequences in it.
        if (tls_current_worker_pool.Get().Get() != this)
            WakeUpOneWorker();
    }

    bool SchedulerWorkerPoolImpl::PostTaskWithSequence(
        std::unique_ptr<Task> task,
        scoped_refptr<Sequence> sequence,
        SchedulerWorker* worker)
    {
        DCHECK(task);
        DCHECK(sequence);
        DCHECK(!worker || ContainsWorker(workers_, worker));

        if (!task_tracker_->WillPostTask(task.get()))
            return false;

        if (task->delayed_run_time.is_null()) {
            PostTaskWithSequenceNow(std::move(task), std::move(sequence), worker);
        } else {
            delayed_task_manager_->AddDelayedTask(std::move(task), std::move(sequence),
                worker, this);
        }

        return true;
    }

    void SchedulerWorkerPoolImpl::PostTaskWithSequenceNow(
        std::unique_ptr<Task> task,
        scoped_refptr<Sequence> sequence,
        SchedulerWorker* worker)
    {
        DCHECK(task);
        DCHECK(sequence);
        DCHECK(!worker || ContainsWorker(workers_, worker));

        // Confirm that |task| is ready to run (its delayed run time is either null or
        // in the past).
        DCHECK_LE(task->delayed_run_time, delayed_task_manager_->Now());

        // Because |worker| belongs to this worker pool, we know that the type
        // of its delegate is SchedulerWorkerDelegateImpl.
        PriorityQueue* const priority_queue = worker
            ? static_cast<SchedulerWorkerDelegateImpl*>(worker->delegate())
                  ->single_threaded_priority_queue()
            : &shared_priority_queue_;
        DCHECK(priority_queue);

        const bool sequence_was_empty = sequence->PushTask(std::move(task));
        if (sequence_was_empty) {
            // Insert |sequence| in |priority_queue| if it was empty before |task| was
            // inserted into it. Otherwise, one of these must be true:
            // - |sequence| is already in a PriorityQueue (not necessarily
            //   |shared_priority_queue_|), or,
            // - A worker is running a Task from |sequence|. It will insert |sequence|
            //   in a PriorityQueue once it's done running the Task.
            const auto sequence_sort_key = sequence->GetSortKey();
            priority_queue->BeginTransaction()->Push(std::move(sequence),
                sequence_sort_key);

            // Wake up a worker to process |sequence|.
            if (worker)
                worker->WakeUp();
            else
                WakeUpOneWorker();
        }
    }

    SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
        SchedulerWorkerDelegateImpl(
            SchedulerWorkerPoolImpl* outer,
            const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
            const PriorityQueue* shared_priority_queue,
            int index)
        : outer_(outer)
        , re_enqueue_sequence_callback_(re_enqueue_sequence_callback)
        , single_threaded_priority_queue_(shared_priority_queue)
        , index_(index)
    {
    }

    SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
        ~SchedulerWorkerDelegateImpl()
        = default;

    void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry(
        SchedulerWorker* worker)
    {
#if DCHECK_IS_ON()
        // Wait for |outer_->workers_created_| to avoid traversing
        // |outer_->workers_| while it is being filled by Initialize().
        outer_->workers_created_.Wait();
        DCHECK(ContainsWorker(outer_->workers_, worker));
#endif

        PlatformThread::SetName(
            StringPrintf("%sWorker%d", outer_->name_.c_str(), index_));

        DCHECK(!tls_current_worker.Get().Get());
        DCHECK(!tls_current_worker_pool.Get().Get());
        tls_current_worker.Get().Set(worker);
        tls_current_worker_pool.Get().Set(outer_);

        ThreadRestrictions::SetIOAllowed(outer_->io_restriction_ == IORestriction::ALLOWED);
    }

    scoped_refptr<Sequence>
    SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork(
        SchedulerWorker* worker)
    {
        DCHECK(ContainsWorker(outer_->workers_, worker));

        scoped_refptr<Sequence> sequence;
        {
            std::unique_ptr<PriorityQueue::Transaction> shared_transaction(
                outer_->shared_priority_queue_.BeginTransaction());
            std::unique_ptr<PriorityQueue::Transaction> single_threaded_transaction(
                single_threaded_priority_queue_.BeginTransaction());

            if (shared_transaction->IsEmpty() && single_threaded_transaction->IsEmpty()) {
                single_threaded_transaction.reset();

                // |shared_transaction| is kept alive while |worker| is added to
                // |idle_workers_stack_| to avoid this race:
                // 1. This thread creates a Transaction, finds |shared_priority_queue_|
                //    empty and ends the Transaction.
                // 2. Other thread creates a Transaction, inserts a Sequence into
                //    |shared_priority_queue_| and ends the Transaction. This can't happen
                //    if the Transaction of step 1 is still active because because there
                //    can only be one active Transaction per PriorityQueue at a time.
                // 3. Other thread calls WakeUpOneWorker(). No thread is woken up because
                //    |idle_workers_stack_| is empty.
                // 4. This thread adds itself to |idle_workers_stack_| and goes to sleep.
                //    No thread runs the Sequence inserted in step 2.
                outer_->AddToIdleWorkersStack(worker);
                return nullptr;
            }

            // True if both PriorityQueues have Sequences and the Sequence at the top of
            // the shared PriorityQueue is more important.
            const bool shared_sequence_is_more_important = !shared_transaction->IsEmpty() && !single_threaded_transaction->IsEmpty() && shared_transaction->PeekSortKey() > single_threaded_transaction->PeekSortKey();

            if (single_threaded_transaction->IsEmpty() || shared_sequence_is_more_important) {
                sequence = shared_transaction->PopSequence();
                last_sequence_is_single_threaded_ = false;
            } else {
                DCHECK(!single_threaded_transaction->IsEmpty());
                sequence = single_threaded_transaction->PopSequence();
                last_sequence_is_single_threaded_ = true;
            }
        }
        DCHECK(sequence);

        outer_->RemoveFromIdleWorkersStack(worker);
        return sequence;
    }

    void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
        ReEnqueueSequence(scoped_refptr<Sequence> sequence)
    {
        if (last_sequence_is_single_threaded_) {
            // A single-threaded Sequence is always re-enqueued in the single-threaded
            // PriorityQueue from which it was extracted.
            const SequenceSortKey sequence_sort_key = sequence->GetSortKey();
            single_threaded_priority_queue_.BeginTransaction()->Push(
                std::move(sequence), sequence_sort_key);
        } else {
            // |re_enqueue_sequence_callback_| will determine in which PriorityQueue
            // |sequence| must be enqueued.
            re_enqueue_sequence_callback_.Run(std::move(sequence));
        }
    }

    TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
        GetSleepTimeout()
    {
        return TimeDelta::Max();
    }

    bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach(
        SchedulerWorker* worker)
    {
        return false;
    }

    SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl(
        StringPiece name,
        IORestriction io_restriction,
        TaskTracker* task_tracker,
        DelayedTaskManager* delayed_task_manager)
        : name_(name.as_string())
        , io_restriction_(io_restriction)
        , idle_workers_stack_lock_(shared_priority_queue_.container_lock())
        , idle_workers_stack_cv_for_testing_(
              idle_workers_stack_lock_.CreateConditionVariable())
        , join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL,
              WaitableEvent::InitialState::NOT_SIGNALED)
        ,
#if DCHECK_IS_ON()
        workers_created_(WaitableEvent::ResetPolicy::MANUAL,
            WaitableEvent::InitialState::NOT_SIGNALED)
        ,
#endif
        task_tracker_(task_tracker)
        , delayed_task_manager_(delayed_task_manager)
    {
        DCHECK(task_tracker_);
        DCHECK(delayed_task_manager_);
    }

    bool SchedulerWorkerPoolImpl::Initialize(
        ThreadPriority thread_priority,
        size_t max_threads,
        const ReEnqueueSequenceCallback& re_enqueue_sequence_callback)
    {
        AutoSchedulerLock auto_lock(idle_workers_stack_lock_);

        DCHECK(workers_.empty());

        for (size_t i = 0; i < max_threads; ++i) {
            std::unique_ptr<SchedulerWorker> worker = SchedulerWorker::Create(
                thread_priority, WrapUnique(new SchedulerWorkerDelegateImpl(this, re_enqueue_sequence_callback, &shared_priority_queue_, static_cast<int>(i))),
                task_tracker_,
                SchedulerWorker::InitialState::ALIVE);
            if (!worker)
                break;
            idle_workers_stack_.Push(worker.get());
            workers_.push_back(std::move(worker));
        }

#if DCHECK_IS_ON()
        workers_created_.Signal();
#endif

        return !workers_.empty();
    }

    void SchedulerWorkerPoolImpl::WakeUpOneWorker()
    {
        SchedulerWorker* worker;
        {
            AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
            worker = idle_workers_stack_.Pop();
        }
        if (worker)
            worker->WakeUp();
    }

    void SchedulerWorkerPoolImpl::AddToIdleWorkersStack(
        SchedulerWorker* worker)
    {
        AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
        idle_workers_stack_.Push(worker);
        DCHECK_LE(idle_workers_stack_.Size(), workers_.size());

        if (idle_workers_stack_.Size() == workers_.size())
            idle_workers_stack_cv_for_testing_->Broadcast();
    }

    void SchedulerWorkerPoolImpl::RemoveFromIdleWorkersStack(
        SchedulerWorker* worker)
    {
        AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
        idle_workers_stack_.Remove(worker);
    }

} // namespace internal
} // namespace base
